{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {
    "pycharm": {
     "name": "#%% md\n"
    }
   },
   "source": [
    "[![AWS SDK for pandas](_static/logo.png \"AWS SDK for pandas\")](https://github.com/aws/aws-sdk-pandas)\n",
    "\n",
    "# 35 - Distributing Calls on Ray Remote Cluster\n",
    "\n",
    "AWS SDK for pandas supports distribution of specific calls on a cluster of EC2s using [ray](https://docs.ray.io/).\n",
    "\n",
    "<div class=\"alert alert-block alert-warning\">\n",
    "Note that this tutorial creates a cluster of EC2 nodes which will incur a charge in your account. Please make sure to delete the cluster at the end.</div>\n",
    "\n",
    "#### Install the library"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "pycharm": {
     "name": "#%%\n"
    }
   },
   "outputs": [],
   "source": [
    "!pip install \"awswrangler[modin,ray]\""
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "pycharm": {
     "name": "#%% md\n"
    }
   },
   "source": [
    "## Configure and Build Ray Cluster on AWS\n",
    "\n",
    "#### Build Prerequisite Infrastructure\n",
    "\n",
    "Click on the link below to provision an AWS CloudFormation stack. It builds a security group and IAM instance profile for the Ray Cluster to use. A valid CIDR range (encompassing your local machine IP) and a VPC ID are required.\n",
    "\n",
    "[<img src=\"https://s3.amazonaws.com/cloudformation-examples/cloudformation-launch-stack.png\">](https://console.aws.amazon.com/cloudformation/home#/stacks/new?stackName=RayPrerequisiteInfra&templateURL=https://aws-data-wrangler-public-artifacts.s3.amazonaws.com/cloudformation/ray-prerequisite-infra.json)\n",
    "\n",
    "#### Configure Ray Cluster Configuration\n",
    "Start with a cluster configuration file (YAML)."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "pycharm": {
     "name": "#%%\n"
    }
   },
   "outputs": [],
   "source": [
    "!touch config.yml"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "metadata": {
    "pycharm": {
     "name": "#%% md\n"
    }
   },
   "source": [
    "Replace all values to match your desired region, account number and name of resources deployed by the above CloudFormation Stack.\n",
    "\n",
    "A limited set of AWS regions is currently supported (Python 3.8 and above). The example configuration below uses the AMI for `us-east-1`.\n",
    "\n",
    "Then edit `config.yml` file with your custom configuration."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "pycharm": {
     "name": "#%%\n"
    },
    "vscode": {
     "languageId": "raw"
    }
   },
   "source": [
    "```yaml\n",
    "cluster_name: pandas-sdk-cluster\n",
    "\n",
    "min_workers: 2\n",
    "max_workers: 2\n",
    "\n",
    "provider:\n",
    "    type: aws\n",
    "    region: us-east-1 # Change AWS region as necessary\n",
    "    availability_zone: us-east-1a,us-east-1b,us-east-1c # Change as necessary\n",
    "    security_group:\n",
    "        GroupName: ray-cluster\n",
    "    cache_stopped_nodes: False\n",
    "\n",
    "available_node_types:\n",
    "  ray.head.default:\n",
    "    node_config:\n",
    "      InstanceType: m4.xlarge\n",
    "      IamInstanceProfile:\n",
    "        # Replace with your account id and profile name if you did not use the default value\n",
    "        Arn: arn:aws:iam::{ACCOUNT ID}:instance-profile/ray-cluster\n",
    "      # Replace ImageId if using a different region / python version\n",
    "      ImageId: ami-02ea7c238b7ba36af\n",
    "      TagSpecifications:  # Optional tags\n",
    "        - ResourceType: \"instance\"\n",
    "          Tags:\n",
    "              - Key: Platform\n",
    "                Value: \"ray\"\n",
    "\n",
    "  ray.worker.default:\n",
    "      min_workers: 2\n",
    "      max_workers: 2\n",
    "      node_config:\n",
    "        InstanceType: m4.xlarge\n",
    "        IamInstanceProfile:\n",
    "          # Replace with your account id and profile name if you did not use the default value\n",
    "          Arn: arn:aws:iam::{ACCOUNT ID}:instance-profile/ray-cluster\n",
    "        # Replace ImageId if using a different region / python version\n",
    "        ImageId: ami-02ea7c238b7ba36af\n",
    "        TagSpecifications:  # Optional tags\n",
    "          - ResourceType: \"instance\"\n",
    "            Tags:\n",
    "                - Key: Platform\n",
    "                  Value: \"ray\"\n",
    "\n",
    "\n",
    "setup_commands:\n",
    "- pip install \"awswrangler[modin,ray]\"\n",
    "```"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "pycharm": {
     "name": "#%% md\n"
    }
   },
   "source": [
    "#### Provision Ray Cluster\n",
    "\n",
    "The command below creates a Ray cluster in your account based on the aforementioned config file. It consists of one head node and 2 workers (m4xlarge EC2s). The command takes a few minutes to complete."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "pycharm": {
     "name": "#%%\n"
    }
   },
   "outputs": [],
   "source": [
    "!ray up -y config.yml"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "pycharm": {
     "name": "#%% md\n"
    }
   },
   "source": [
    "Once the cluster is up and running, we set the `RAY_ADDRESS` environment variable to the head node Ray Cluster Address"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "pycharm": {
     "name": "#%%\n"
    }
   },
   "outputs": [],
   "source": [
    "import os\n",
    "import subprocess\n",
    "\n",
    "head_node_ip = subprocess.check_output([\"ray\", \"get-head-ip\", \"config.yml\"]).decode(\"utf-8\").split(\"\\n\")[-2]\n",
    "os.environ[\"RAY_ADDRESS\"] = f\"ray://{head_node_ip}:10001\""
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "pycharm": {
     "name": "#%% md\n"
    }
   },
   "source": [
    "As a result, `awswrangler` API calls now run on the cluster, not on your local machine. The SDK detects the required dependencies for its distributed mode and parallelizes supported methods on the cluster."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "pycharm": {
     "name": "#%%\n"
    }
   },
   "outputs": [],
   "source": [
    "import modin.pandas as pd\n",
    "\n",
    "import awswrangler as wr\n",
    "\n",
    "print(f\"Execution engine: {wr.engine.get()}\")\n",
    "print(f\"Memory format: {wr.memory_format.get()}\")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "pycharm": {
     "name": "#%% md\n"
    }
   },
   "source": [
    "Enter bucket Name"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "pycharm": {
     "name": "#%%\n"
    }
   },
   "outputs": [],
   "source": [
    "bucket = \"BUCKET_NAME\""
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "pycharm": {
     "name": "#%% md\n"
    }
   },
   "source": [
    "#### Read & write some data at scale on the cluster"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "pycharm": {
     "name": "#%%\n"
    }
   },
   "outputs": [],
   "source": [
    "# Read last 3 months of Taxi parquet compressed data (400 Mb)\n",
    "df = wr.s3.read_parquet(path=\"s3://ursa-labs-taxi-data/2018/1*.parquet\")\n",
    "df[\"month\"] = df[\"pickup_at\"].dt.month\n",
    "\n",
    "# Write it back to S3 partitioned by month\n",
    "path = f\"s3://{bucket}/taxi-data/\"\n",
    "database = \"ray_test\"\n",
    "wr.catalog.create_database(name=database, exist_ok=True)\n",
    "table = \"nyc_taxi\"\n",
    "\n",
    "wr.s3.to_parquet(\n",
    "    df=df,\n",
    "    path=path,\n",
    "    dataset=True,\n",
    "    database=database,\n",
    "    table=table,\n",
    "    partition_cols=[\"month\"],\n",
    ")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### Read it back via Athena UNLOAD\n",
    "\n",
    "The [UNLOAD](https://docs.aws.amazon.com/athena/latest/ug/unload.html) command distributes query processing in Athena to dump results in S3 which are then read in parallel into a dataframe"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "unload_path = f\"s3://{bucket}/unload/nyc_taxi/\"\n",
    "\n",
    "# Athena UNLOAD requires that the S3 path is empty\n",
    "# Note that s3.delete_objects is also a distributed call\n",
    "wr.s3.delete_objects(unload_path)\n",
    "\n",
    "wr.athena.read_sql_query(\n",
    "    f\"SELECT * FROM {table}\",\n",
    "    database=database,\n",
    "    ctas_approach=False,\n",
    "    unload_approach=True,\n",
    "    s3_output=unload_path,\n",
    ")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "<div class=\"alert alert-block alert-warning\">\n",
    "The EC2 cluster must be terminated or it will incur a charge.\n",
    "</div>"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "!ray down -y ./config.yml"
   ]
  },
  {
   "attachments": {},
   "cell_type": "markdown",
   "metadata": {
    "pycharm": {
     "name": "#%% md\n"
    }
   },
   "source": [
    "[More Info on Ray Clusters on AWS](https://docs.ray.io/en/latest/cluster/vms/getting-started.html#launch-a-cluster-on-a-cloud-provider)"
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3.8.5 ('awswrangler-mo8sEp3D-py3.8')",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.8.5"
  },
  "vscode": {
   "interpreter": {
    "hash": "350d68fa765a50d15f89103ff6102f3b96ae3e7bdc6e5e7a4956e4c1d21b94bd"
   }
  }
 },
 "nbformat": 4,
 "nbformat_minor": 4
}
